/*-------------------------------------------------------------------------
 *
 * create_distributed_table.c
 *	  Routines relation to the creation of distributed relations.
 *
 * Copyright (c) Citus Data, Inc.
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "miscadmin.h"

#include "access/genam.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/htup.h"
#include "access/nbtree.h"
#include "access/xact.h"
#include "access/ustore/knl_uscan.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "distributed/commands/citus_sequence.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "parser/parse_expr.h"
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "parser/parser.h"
#include "postmaster/postmaster.h"
#include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"

#include "pg_version_constants.h"

#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/distribution_column.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/replicate_none_dist_table_shard.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/shard_transfer.h"
#include "distributed/shared_library_init.h"
#include "distributed/session_ctx.h"
#include "distributed/utils/distribution_column_map.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"

static bool tableam_scan_getnextslot(TableScanDesc sscan, ScanDirection direction,
                                     TupleTableSlot* slot)
{
    if (sscan->rs_rd->rd_tam_ops == TableAmUstore) {
        return (UHeapGetNextSlotGuts(sscan, direction, slot) != nullptr);
    }

    auto tuple = tableam_scan_getnexttuple(sscan, direction);
    if (tuple == nullptr) {
        return false;
    }

    ExecStoreTuple(tuple, slot, InvalidBuffer, false);
    return true;
}

/* common params that apply to all Citus table types */
typedef struct {
    char distributionMethod;
    char replicationModel;
} CitusTableParams;

/*
 * Params that only apply to distributed tables, i.e., the ones that are
 * known as DISTRIBUTED_TABLE by Citus metadata.
 */
typedef struct {
    int shardCount;
    bool shardCountIsStrict;
    char* distributionColumnName;
    ColocationParam colocationParam;
} DistributedTableParams;

/*
 * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
 */
#define LOG_PER_TUPLE_AMOUNT 1000000

/* local function forward declarations */
static void CreateDistributedTableConcurrently(Oid relationId,
                                               char* distributionColumnName,
                                               char distributionMethod,
                                               char* colocateWithTableName,
                                               int shardCount, bool shardCountIsStrict);
static char DecideDistTableReplicationModel(char distributionMethod,
                                            char* colocateWithTableName);
static List* HashSplitPointsForShardList(List* shardList);
static List* HashSplitPointsForShardCount(int shardCount);
static List* WorkerNodesForShardList(List* shardList);
static List* RoundRobinWorkerNodeList(List* workerNodeList, int listLength);
static CitusTableParams DecideCitusTableParams(
    CitusTableType tableType, DistributedTableParams* distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
                             DistributedTableParams* distributedTableParams);
static void ConvertCitusLocalTableToTableType(
    Oid relationId, CitusTableType tableType,
    DistributedTableParams* distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
                                             Oid colocatedTableId, bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
                                        uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
                                      DistributedTableParams* distributedTableParams,
                                      Var* distributionColumn);
static void EnsureRelationCanBeDistributed(Oid relationId, Var* distributionColumn,
                                           char distributionMethod, uint32 colocationId,
                                           char replicationModel);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureRelationHasNoTriggers(Oid relationId);
static Oid SupportFunctionForColumn(Var* partitionColumn, Oid accessMethodId,
                                    int16 supportFunctionNumber);
static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod);
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod);
static void EnsureCitusTableCanBeCreated(Oid relationOid);
static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId);
static void EnsureDistributedSequencesHaveOneType(Oid relationId, List* seqInfoList);
static void CopyLocalDataIntoShards(Oid distributedTableId);
static List* TupleDescColumnNameList(TupleDesc tupleDescriptor);

static bool DistributionColumnUsesNumericColumnNegativeScale(TupleDesc relationDesc,
                                                             Var* distributionColumn);
static int numeric_typmod_scale(int32 typmod);
static bool is_valid_numeric_typmod(int32 typmod);

static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
                                                        Var* distributionColumn);
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
static uint64 DoCopyFromLocalTableIntoShards(Relation distributedRelation,
                                             DestReceiver* copyDest, TupleTableSlot* slot,
                                             EState* estate);
static void ErrorIfTemporaryTable(Oid relationId);
static void ErrorIfForeignTable(Oid relationOid);
static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId);
static void EnsureDistributableTable(Oid relationId);
static void EnsureForeignKeysForDistributedTableConcurrently(Oid relationId);
static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
                                           char* distributionColumnName,
                                           char* colocateWithTableName);
static void WarnIfTableHaveNoReplicaIdentity(Oid relationId);
static void ErrorIfUnloggedTable(Oid relationId);

/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(create_distributed_table_concurrently);
PG_FUNCTION_INFO_V1(create_distributed_table);
PG_FUNCTION_INFO_V1(create_reference_table);

extern "C" Datum create_reference_table(PG_FUNCTION_ARGS);
extern "C" Datum create_distributed_table(PG_FUNCTION_ARGS);
extern "C" Datum create_distributed_table_concurrently(PG_FUNCTION_ARGS);

/* colocaton table is diabled by opengauss */
char* DISABLE_COLOCATION_NAME = "none";

/*
 * create_distributed_table gets a table name, distribution column,
 * distribution method and colocate_with option, then it creates a
 * distributed table.
 */
Datum create_distributed_table(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    if (PG_ARGISNULL(0)) {
        PG_RETURN_VOID();
    }

    Oid relationId = PG_GETARG_OID(0);
    text* distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
    Oid distributionMethodOid = PG_GETARG_OID(2);

    char* colocateWithTableName = DISABLE_COLOCATION_NAME;

    bool shardCountIsStrict = false;
    if (distributionColumnText) {
        if (PG_ARGISNULL(2)) {
            PG_RETURN_VOID();
        }

        int shardCount = Session_ctx::Vars().ShardCount;
        if (!PG_ARGISNULL(3)) {
            if (!IsColocateWithDefault(colocateWithTableName) &&
                !IsColocateWithNone(colocateWithTableName)) {
                ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
                                       "and shard_count at the same time")));
            }

            shardCount = PG_GETARG_INT32(3);

            /*
             * If shard_count parameter is given, then we have to
             * make sure table has that many shards.
             */
            shardCountIsStrict = true;
        }

        char* distributionColumnName = text_to_cstring(distributionColumnText);
        Assert(distributionColumnName != NULL);

        char distributionMethod = LookupDistributionMethod(distributionMethodOid);

        if (shardCount < 1 || shardCount > MAX_SHARD_COUNT) {
            ereport(ERROR, (errmsg("%d is outside the valid range for "
                                   "parameter \"shard_count\" (1 .. %d)",
                                   shardCount, MAX_SHARD_COUNT)));
        }

        CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
                               shardCount, shardCountIsStrict, colocateWithTableName);
    } else {
        if (!PG_ARGISNULL(3)) {
            ereport(ERROR, (errmsg("shard_count can't be specified when the "
                                   "distribution column is null because in "
                                   "that case it's automatically set to 1")));
        }

        if (!PG_ARGISNULL(2) &&
            LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH) {
            /*
             * As we do for shard_count parameter, we could throw an error if
             * distribution_type is not NULL when creating a single-shard table.
             * However, this requires changing the default value of distribution_type
             * parameter to NULL and this would mean a breaking change for most
             * users because they're mostly using this API to create sharded
             * tables. For this reason, here we instead do nothing if the distribution
             * method is DISTRIBUTE_BY_HASH.
             */
            ereport(ERROR, (errmsg("distribution_type can't be specified "
                                   "when the distribution column is null ")));
        }

        ColocationParam colocationParam = {
            .colocateWithTableName = colocateWithTableName,
            .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
        };
        CreateSingleShardTable(relationId, colocationParam);
    }

    PG_RETURN_VOID();
}

/*
 * create_distributed_concurrently gets a table name, distribution column,
 * distribution method and colocate_with option, then it creates a
 * distributed table.
 */
Datum create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    if (PG_ARGISNULL(0) || PG_ARGISNULL(2)) {
        PG_RETURN_VOID();
    }

    if (PG_ARGISNULL(1)) {
        ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently "
                               "to create a distributed table with a null shard "
                               "key, consider using create_distributed_table()")));
    }

    Oid relationId = PG_GETARG_OID(0);
    text* distributionColumnText = PG_GETARG_TEXT_P(1);
    char* distributionColumnName = text_to_cstring(distributionColumnText);
    Oid distributionMethodOid = PG_GETARG_OID(2);
    char distributionMethod = LookupDistributionMethod(distributionMethodOid);

    char* colocateWithTableName = DISABLE_COLOCATION_NAME;

    bool shardCountIsStrict = false;
    int shardCount = Session_ctx::Vars().ShardCount;
    if (!PG_ARGISNULL(3)) {
        if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
            pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) {
            ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
                                   "and shard_count at the same time")));
        }

        shardCount = PG_GETARG_INT32(3);

        /*
         * if shard_count parameter is given than we have to
         * make sure table has that many shards
         */
        shardCountIsStrict = true;
    }

    CreateDistributedTableConcurrently(relationId, distributionColumnName,
                                       distributionMethod, colocateWithTableName,
                                       shardCount, shardCountIsStrict);

    PG_RETURN_VOID();
}

/*
 * CreateDistributedTableConcurrently distributes a table by first converting
 * it to a Citus local table and then splitting the shard of the Citus local
 * table.
 *
 * If anything goes wrong during the second phase, the table is left as a
 * Citus local table.
 */
static void CreateDistributedTableConcurrently(Oid relationId,
                                               char* distributionColumnName,
                                               char distributionMethod,
                                               char* colocateWithTableName,
                                               int shardCount, bool shardCountIsStrict)
{
    ereport(ERROR, (errmsg("distribute function is not supported now. ")));

    /*
     * We disallow create_distributed_table_concurrently in transaction blocks
     * because we cannot handle preceding writes, and we block writes at the
     * very end of the operation so the transaction should end immediately after.
     */
    // @FIXME IMPORTANT!
    // PreventInTransactionBlock(true, "create_distributed_table_concurrently");

    /*
     * do not allow multiple create_distributed_table_concurrently in the same
     * transaction. We should do that check just here because concurrent local table
     * conversion can cause issues.
     */
    ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction();

    /* do not allow concurrent CreateDistributedTableConcurrently operations */
    AcquireCreateDistributedTableConcurrentlyLock(relationId);

    if (distributionMethod != DISTRIBUTE_BY_HASH) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                        errmsg("only hash-distributed tables can be distributed "
                               "without blocking writes")));
    }

    if (Session_ctx::Vars().ShardReplicationFactor > 1) {
        ereport(ERROR, (errmsg("cannot distribute a table concurrently when "
                               "spq.shard_replication_factor > 1")));
    }

    DropOrphanedResourcesInSeparateTransaction();

    EnsureCitusTableCanBeCreated(relationId);

    EnsureValidDistributionColumn(relationId, distributionColumnName);

    /*
     * Ensure table type is valid to be distributed. It should be either regular or citus
     * local table.
     */
    EnsureDistributableTable(relationId);

    /*
     * we rely on citus_add_local_table_to_metadata, so it can generate irrelevant
     * messages. we want to error with a user friendly message if foreign keys are not
     * supported. We can miss foreign key violations because we are not holding locks, so
     * relation can be modified until we acquire the lock for the relation, but we do as
     * much as we can to be user friendly on foreign key violation messages.
     */

    EnsureForeignKeysForDistributedTableConcurrently(relationId);

    char replicationModel =
        DecideDistTableReplicationModel(distributionMethod, colocateWithTableName);

    /*
     * we fail transaction before local table conversion if the table could not be
     * colocated with given table. We should make those checks after local table
     * conversion by acquiring locks to the relation because the distribution column can
     * be modified in that period.
     */
    if (!IsColocateWithDefault(colocateWithTableName) &&
        !IsColocateWithNone(colocateWithTableName)) {
        if (replicationModel != REPLICATION_MODEL_STREAMING) {
            ereport(ERROR, (errmsg("cannot create distributed table "
                                   "concurrently because Citus allows "
                                   "concurrent table distribution only when "
                                   "spq.shard_replication_factor = 1"),
                            errhint("table %s is requested to be colocated "
                                    "with %s which has "
                                    "spq.shard_replication_factor > 1",
                                    get_rel_name(relationId), colocateWithTableName)));
        }

        EnsureColocateWithTableIsValid(relationId, distributionMethod,
                                       distributionColumnName, colocateWithTableName);
    }

    /*
     * Get name of the table before possibly replacing it in
     * citus_add_local_table_to_metadata.
     */
    char* tableName = get_rel_name(relationId);
    Oid schemaId = get_rel_namespace(relationId);
    char* schemaName = get_namespace_name(schemaId);
    RangeVar* rangeVar = makeRangeVar(schemaName, tableName, -1);

    /* If table is a regular table, then we need to add it into metadata. */
    if (!IsCitusTable(relationId)) {
        /*
         * Before taking locks, convert the table into a Citus local table and commit
         * to allow shard split to see the shard.
         */
        SendAddLocalTableToMetadataCommandOutsideTransaction(relationId);
    }

    /*
     * Lock target relation with a shard update exclusive lock to
     * block DDL, but not writes.
     *
     * If there was a concurrent drop/rename, error out by setting missingOK = false.
     */
    bool missingOK = false;
    relationId = RangeVarGetRelid(rangeVar, ShareUpdateExclusiveLock, missingOK);

    if (PartitionedTableNoLock(relationId)) {
        /* also lock partitions */
        LockPartitionRelations(relationId, ShareUpdateExclusiveLock);
    }

    WarnIfTableHaveNoReplicaIdentity(relationId);

    List* shardList = LoadShardIntervalList(relationId);

    /*
     * It's technically possible for the table to have been concurrently
     * distributed just after citus_add_local_table_to_metadata and just
     * before acquiring the lock, so double check.
     */
    if (list_length(shardList) != 1 || !IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                        errmsg("table was concurrently modified")));
    }

    /*
     * The table currently has one shard, we will split that shard to match the
     * target distribution.
     */
    ShardInterval* shardToSplit = (ShardInterval*)linitial(shardList);

    PropagatePrerequisiteObjectsForDistributedTable(relationId);

    /*
     * we should re-evaluate distribution column values. It may have changed,
     * because we did not lock the relation at the previous check before local
     * table conversion.
     */
    Var* distributionColumn =
        BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock);
    Oid distributionColumnType = distributionColumn->vartype;
    Oid distributionColumnCollation = distributionColumn->varcollid;

    /* get an advisory lock to serialize concurrent default group creations */
    if (IsColocateWithDefault(colocateWithTableName)) {
        AcquireColocationDefaultLock();
    }

    /*
     * At this stage, we only want to check for an existing co-location group.
     * We cannot create a new co-location group until after replication slot
     * creation in NonBlockingShardSplit.
     */
    uint32 colocationId = FindColocateWithColocationId(
        relationId, replicationModel, distributionColumnType, distributionColumnCollation,
        shardCount, shardCountIsStrict, colocateWithTableName);

    if (IsColocateWithDefault(colocateWithTableName) &&
        (colocationId != INVALID_COLOCATION_ID)) {
        /*
         * we can release advisory lock if there is already a default entry for given
         * params; else, we should keep it to prevent different default coloc entry
         * creation by concurrent operations.
         */
        ReleaseColocationDefaultLock();
    }

    EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
                                   colocationId, replicationModel);

    Oid colocatedTableId = InvalidOid;
    if (colocationId != INVALID_COLOCATION_ID) {
        colocatedTableId = ColocatedTableId(colocationId);
    }

    List* workerNodeList = DistributedTablePlacementNodeList(NoLock);
    if (workerNodeList == NIL) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                        errmsg("no worker nodes are available for placing shards"),
                        errhint("Add more worker nodes.")));
    }

    List* workersForPlacementList;
    List* shardSplitPointsList;

    if (colocatedTableId != InvalidOid) {
        List* colocatedShardList = LoadShardIntervalList(colocatedTableId);

        /*
         * Match the shard ranges of an existing table.
         */
        shardSplitPointsList = HashSplitPointsForShardList(colocatedShardList);

        /*
         * Find the node IDs of the shard placements.
         */
        workersForPlacementList = WorkerNodesForShardList(colocatedShardList);
    } else {
        /*
         * Generate a new set of #shardCount shards.
         */
        shardSplitPointsList = HashSplitPointsForShardCount(shardCount);

        /*
         * Place shards in a round-robin fashion across all data nodes.
         */
        workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
    }

    /*
     * Make sure that existing reference tables have been replicated to all the nodes
     * such that we can create foreign keys and joins work immediately after creation.
     * We do this after applying all essential checks to error out early in case of
     * user error.
     *
     * Use force_logical since this function is meant to not block writes.
     */
    EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_FORCE_LOGICAL);

    /*
     * At this point, the table is a Citus local table, which means it does
     * not have a partition column in the metadata. However, we cannot update
     * the metadata here because that would prevent us from creating a replication
     * slot to copy ongoing changes. Instead, we pass a hash that maps relation
     * IDs to partition column vars.
     */
    DistributionColumnMap* distributionColumnOverrides = CreateDistributionColumnMap();
    AddDistributionColumnForRelation(distributionColumnOverrides, relationId,
                                     distributionColumnName);

    /*
     * there is no colocation entries yet for local table, so we should
     * check if table has any partition and add them to same colocation
     * group
     */
    List* sourceColocatedShardIntervalList = ListShardsUnderParentRelation(relationId);

    SplitMode splitMode = NON_BLOCKING_SPLIT;
    SplitOperation splitOperation = CREATE_DISTRIBUTED_TABLE;
    SplitShard(splitMode, splitOperation, shardToSplit->shardId, shardSplitPointsList,
               workersForPlacementList, distributionColumnOverrides,
               sourceColocatedShardIntervalList, colocationId);
}

/*
 * EnsureForeignKeysForDistributedTableConcurrently ensures that referenced and
 * referencing foreign keys for the given table are supported.
 *
 * We allow distributed -> reference
 *          distributed -> citus local
 *
 * We disallow reference   -> distributed
 *             citus local -> distributed
 *             regular     -> distributed
 *
 * Normally regular		-> distributed is allowed but it is not allowed when we create the
 * distributed table concurrently because we rely on conversion of regular table to citus
 * local table, which errors with an unfriendly message.
 */
static void EnsureForeignKeysForDistributedTableConcurrently(Oid relationId)
{
    /*
     * disallow citus local -> distributed fkeys.
     * disallow reference   -> distributed fkeys.
     * disallow regular     -> distributed fkeys.
     */
    EnsureNoFKeyFromTableType(relationId, INCLUDE_CITUS_LOCAL_TABLES |
                                              INCLUDE_REFERENCE_TABLES |
                                              INCLUDE_LOCAL_TABLES);

    /*
     * disallow distributed -> regular fkeys.
     */
    EnsureNoFKeyToTableType(relationId, INCLUDE_LOCAL_TABLES);
}

/*
 * EnsureColocateWithTableIsValid ensures given relation can be colocated with the table
 * of given name.
 */
static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
                                           char* distributionColumnName,
                                           char* colocateWithTableName)
{
    char replicationModel =
        DecideDistTableReplicationModel(distributionMethod, colocateWithTableName);

    /*
     * we fail transaction before local table conversion if the table could not be
     * colocated with given table. We should make those checks after local table
     * conversion by acquiring locks to the relation because the distribution column can
     * be modified in that period.
     */
    Oid distributionColumnType =
        ColumnTypeIdForRelationColumnName(relationId, distributionColumnName);

    text* colocateWithTableNameText = cstring_to_text(colocateWithTableName);
    Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false);
    EnsureTableCanBeColocatedWith(relationId, replicationModel, distributionColumnType,
                                  colocateWithTableId);
}

/*
 * AcquireCreateDistributedTableConcurrentlyLock does not allow concurrent
 * create_distributed_table_concurrently operations.
 */
void AcquireCreateDistributedTableConcurrentlyLock(Oid relationId)
{
    LOCKTAG tag;
    const bool sessionLock = false;
    const bool dontWait = true;

    SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY);

    LockAcquireResult lockAcquired =
        LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
    if (!lockAcquired) {
        ereport(ERROR, (errmsg("another create_distributed_table_concurrently "
                               "operation is in progress"),
                        errhint("Make sure that the concurrent operation has "
                                "finished and re-run the command")));
    }
}

/*
 * SendAddLocalTableToMetadataCommandOutsideTransaction executes metadata add local
 * table command locally to avoid deadlock.
 */
static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId)
{
    char* qualifiedRelationName = generate_qualified_relation_name(relationId);

    /*
     * we need to allow nested distributed execution, because we start a new distributed
     * execution inside the pushed-down UDF citus_add_local_table_to_metadata. Normally
     * citus does not allow that because it cannot guarantee correctness.
     */
    StringInfo allowNestedDistributionCommand = makeStringInfo();
    appendStringInfo(allowNestedDistributionCommand,
                     "SET LOCAL spq.allow_nested_distributed_execution to ON");

    StringInfo addLocalTableToMetadataCommand = makeStringInfo();
    appendStringInfo(addLocalTableToMetadataCommand,
                     "SELECT pg_catalog.citus_add_local_table_to_metadata(%s)",
                     quote_literal_cstr(qualifiedRelationName));

    List* commands = list_make2(allowNestedDistributionCommand->data,
                                addLocalTableToMetadataCommand->data);
    char* username = NULL;
    SendCommandListToWorkerOutsideTransaction(Session_ctx::Vars().LocalHostName,
                                              g_instance.attr.attr_network.PostPortNumber,
                                              username, commands);
}

/*
 * WarnIfTableHaveNoReplicaIdentity notices user if the given table or its partitions (if
 * any) do not have a replica identity which is required for logical replication to
 * replicate UPDATE and DELETE commands during create_distributed_table_concurrently.
 */
void WarnIfTableHaveNoReplicaIdentity(Oid relationId)
{
    bool foundRelationWithNoReplicaIdentity = false;
    {
        if (!RelationCanPublishAllModifications(relationId)) {
            foundRelationWithNoReplicaIdentity = true;
        }
    }

    if (foundRelationWithNoReplicaIdentity) {
        char* relationName = get_rel_name(relationId);

        ereport(
            NOTICE,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
             errmsg("relation %s does not have a REPLICA "
                    "IDENTITY or PRIMARY KEY",
                    relationName),
             errdetail("UPDATE and DELETE commands on the relation will "
                       "error out during create_distributed_table_concurrently unless "
                       "there is a REPLICA IDENTITY or PRIMARY KEY. "
                       "INSERT commands will still work.")));
    }
}

/*
 * HashSplitPointsForShardList returns a list of split points which match
 * the shard ranges of the given list of shards;
 */
static List* HashSplitPointsForShardList(List* shardList)
{
    List* splitPointList = NIL;

    ShardInterval* shardInterval = NULL;
    foreach_declared_ptr(shardInterval, shardList)
    {
        int32 shardMaxValue = DatumGetInt32(shardInterval->maxValue);

        splitPointList = lappend_int(splitPointList, shardMaxValue);
    }

    /*
     * Split point lists only include the upper boundaries.
     */
    splitPointList = list_delete_last(splitPointList);

    return splitPointList;
}

/*
 * HashSplitPointsForShardCount returns a list of split points for a given
 * shard count with roughly equal hash ranges.
 */
static List* HashSplitPointsForShardCount(int shardCount)
{
    List* splitPointList = NIL;

    /* calculate the split of the hash space */
    uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;

    /*
     * Split points lists only include the upper boundaries, so we only
     * go up to shardCount - 1 and do not have to apply the correction
     * for the last shardmaxvalue.
     */
    for (int64 shardIndex = 0; shardIndex < shardCount - 1; shardIndex++) {
        /* initialize the hash token space for this shard */
        int32 shardMinValue = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
        int32 shardMaxValue = shardMinValue + (hashTokenIncrement - 1);

        splitPointList = lappend_int(splitPointList, shardMaxValue);
    }

    return splitPointList;
}

/*
 * WorkerNodesForShardList returns a list of node ids reflecting the locations of
 * the given list of shards.
 */
static List* WorkerNodesForShardList(List* shardList)
{
    List* nodeIdList = NIL;

    ShardInterval* shardInterval = NULL;
    foreach_declared_ptr(shardInterval, shardList)
    {
        WorkerNode* workerNode = ActiveShardPlacementWorkerNode(shardInterval->shardId);
        nodeIdList = lappend_int(nodeIdList, workerNode->nodeId);
    }

    return nodeIdList;
}

/*
 * RoundRobinWorkerNodeList round robins over the workers in the worker node list
 * and adds node ids to a list of length listLength.
 */
static List* RoundRobinWorkerNodeList(List* workerNodeList, int listLength)
{
    Assert(workerNodeList != NIL);

    List* nodeIdList = NIL;

    for (int idx = 0; idx < listLength; idx++) {
        int nodeIdx = idx % list_length(workerNodeList);
        WorkerNode* workerNode = (WorkerNode*)list_nth(workerNodeList, nodeIdx);
        nodeIdList = lappend_int(nodeIdList, workerNode->nodeId);
    }

    return nodeIdList;
}

/*
 * create_reference_table creates a distributed table with the given relationId. The
 * created table has one shard and replication factor is set to the active worker
 * count. In fact, the above is the definition of a reference table in Citus.
 */
Datum create_reference_table(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);
    Oid relationId = PG_GETARG_OID(0);

    CreateReferenceTable(relationId);
    PG_RETURN_VOID();
}

/*
 * EnsureCitusTableCanBeCreated checks if
 * - we are on the coordinator
 * - the current user is the owner of the table
 * - relation kind is supported
 * - relation is not a shard
 */
static void EnsureCitusTableCanBeCreated(Oid relationOid)
{
    EnsureCoordinator();
    EnsureRelationExists(relationOid);
    EnsureTableOwner(relationOid);
    ErrorIfTemporaryTable(relationOid);
    ErrorIfForeignTable(relationOid);
    ErrorIfUnloggedTable(relationOid);

    /*
     * We should do this check here since the codes in the following lines rely
     * on this relation to have a supported relation kind. More extensive checks
     * will be performed in CreateDistributedTable.
     */
    EnsureRelationKindSupported(relationOid);

    /*
     * When coordinator is added to the metadata, or on the workers,
     * some of the relations of the coordinator node may/will be shards.
     * We disallow creating distributed tables from shard relations, by
     * erroring out here.
     */
    ErrorIfRelationIsAKnownShard(relationOid);
}

/*
 * EnsureRelationExists does a basic check on whether the OID belongs to
 * an existing relation.
 */
void EnsureRelationExists(Oid relationId)
{
    if (!RelationExists(relationId)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                        errmsg("relation with OID %d does not exist", relationId)));
    }
}

void EnsureForeignKeyNotExists(Oid relationId)
{
    if (HasForeignKey(relationId)) {
        ereport(ERROR, (errmsg("Foreign keys are not supported "
                               "when creating distributed tables")));
    }
}

/*
 * CreateReferenceTable is a wrapper around CreateCitusTable that creates a
 * distributed table.
 */
void CreateDistributedTable(Oid relationId, char* distributionColumnName,
                            char distributionMethod, int shardCount,
                            bool shardCountIsStrict, char* colocateWithTableName)
{
    CitusTableType tableType;
    switch (distributionMethod) {
        case DISTRIBUTE_BY_HASH: {
            tableType = HASH_DISTRIBUTED;
            break;
        }

        case DISTRIBUTE_BY_APPEND: {
            tableType = APPEND_DISTRIBUTED;
            break;
        }

        case DISTRIBUTE_BY_RANGE: {
            tableType = RANGE_DISTRIBUTED;
            break;
        }

        default: {
            ereport(ERROR, (errmsg("unexpected distribution method when "
                                   "deciding Citus table type")));
            break;
        }
    }

    DistributedTableParams distributedTableParams = {
        .shardCount = shardCount,
        .shardCountIsStrict = shardCountIsStrict,
        .distributionColumnName = distributionColumnName,
        .colocationParam = {.colocateWithTableName = colocateWithTableName,
                            .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT}};
    EnsureForeignKeyNotExists(relationId);
    CreateCitusTable(relationId, tableType, &distributedTableParams);
}

/*
 * CreateReferenceTable creates a reference table.
 */
void CreateReferenceTable(Oid relationId)
{
    if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        /*
         * Create the shard of given Citus local table on workers to convert
         * it into a reference table.
         */
        ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL);
    } else {
        CreateCitusTable(relationId, REFERENCE_TABLE, NULL);
    }
}

/*
 * CreateSingleShardTable creates a single shard distributed table that
 * doesn't have a shard key.
 */
void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
{
    DistributedTableParams distributedTableParams = {.shardCount = 1,
                                                     .shardCountIsStrict = true,
                                                     .distributionColumnName = NULL,
                                                     .colocationParam = colocationParam};

    if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        /*
         * Create the shard of given Citus local table on appropriate node
         * and drop the local one to convert it into a single-shard distributed
         * table.
         */
        ConvertCitusLocalTableToTableType(relationId, SINGLE_SHARD_DISTRIBUTED,
                                          &distributedTableParams);
    } else {
        CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams);
    }
}

/*
 * CreateCitusTable is the internal method that creates a Citus table in
 * given configuration.
 *
 * DistributedTableParams should be non-null only if we're creating a distributed
 * table.
 *
 * This functions contains all necessary logic to create distributed tables. It
 * performs necessary checks to ensure distributing the table is safe. If it is
 * safe to distribute the table, this function creates distributed table metadata,
 * creates shards and copies local data to shards. This function also handles
 * partitioned tables by distributing its partitions as well.
 */
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
                             DistributedTableParams* distributedTableParams)
{
    if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
         tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) !=
        (distributedTableParams != NULL)) {
        ereport(ERROR, (errmsg("distributed table params must be provided "
                               "when creating a distributed table and must "
                               "not be otherwise")));
    }

    EnsureCitusTableCanBeCreated(relationId);

    /* allow creating a Citus table on an empty cluster */
    InsertCoordinatorIfClusterEmpty();

    Relation relation = try_relation_open(relationId, ExclusiveLock);
    if (relation == NULL) {
        ereport(ERROR, (errmsg("could not create spq table: "
                               "relation does not exist")));
    }

    relation_close(relation, NoLock);

    if (tableType == SINGLE_SHARD_DISTRIBUTED &&
        Session_ctx::Vars().ShardReplicationFactor > 1) {
        ereport(ERROR, (errmsg("could not create single shard table: "
                               "spq.shard_replication_factor is greater than 1"),
                        errhint("Consider setting spq.shard_replication_factor to 1 "
                                "and try again")));
    }

    /*
     * EnsureTableNotDistributed errors out when relation is a citus table but
     * we don't want to ask user to first undistribute their citus local tables
     * when creating distributed tables from them.
     * For this reason, here we undistribute citus local tables beforehand.
     * But since UndistributeTable does not support undistributing relations
     * involved in foreign key relationships, we first drop foreign keys that
     * given relation is involved, then we undistribute the relation and finally
     * we re-create dropped foreign keys at the end of this function.
     */
    List* originalForeignKeyRecreationCommands = NIL;
    if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        /*
         * We use ConvertCitusLocalTableToTableType instead of CreateCitusTable
         * to create a reference table or a single-shard table from a Citus
         * local table.
         */
        Assert(tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED);

        /* store foreign key creation commands that relation is involved */
        originalForeignKeyRecreationCommands =
            GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
                                                                 INCLUDE_ALL_TABLE_TYPES);
        relationId = DropFKeysAndUndistributeTable(relationId);
    }
    /*
     * To support foreign keys between reference tables and local tables,
     * we drop & re-define foreign keys at the end of this function so
     * that ALTER TABLE hook does the necessary job, which means converting
     * local tables to citus local tables to properly support such foreign
     * keys.
     */
    else if (tableType == REFERENCE_TABLE && ShouldEnableLocalReferenceForeignKeys() &&
             HasForeignKeyWithLocalTable(relationId)) {
        /*
         * Store foreign key creation commands for foreign key relationships
         * that relation has with postgres tables.
         */
        originalForeignKeyRecreationCommands =
            GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
                                                                 INCLUDE_LOCAL_TABLES);

        /*
         * Soon we will convert local tables to citus local tables. As
         * CreateCitusLocalTable needs to use local execution, now we
         * switch to local execution beforehand so that reference table
         * creation doesn't use remote execution and we don't error out
         * in CreateCitusLocalTable.
         */
        SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);

        DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
    }

    LockRelationOid(relationId, ExclusiveLock);

    EnsureTableNotDistributed(relationId);

    PropagatePrerequisiteObjectsForDistributedTable(relationId);

    Var* distributionColumn = NULL;
    if (distributedTableParams && distributedTableParams->distributionColumnName) {
        distributionColumn = BuildDistributionKeyFromColumnName(
            relationId, distributedTableParams->distributionColumnName, NoLock);
    }

    CitusTableParams citusTableParams =
        DecideCitusTableParams(tableType, distributedTableParams);

    /*
     * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
     * our caller already acquired lock on relationId.
     */
    uint32 colocationId = INVALID_COLOCATION_ID;
    if (distributedTableParams &&
        distributedTableParams->colocationParam.colocationParamType ==
            COLOCATE_WITH_COLOCATION_ID) {
        colocationId = distributedTableParams->colocationParam.colocationId;
    } else {
        /*
         * ColocationIdForNewTable assumes caller acquires lock on relationId. In our
         * case, our caller already acquired lock on relationId.
         */
        colocationId = ColocationIdForNewTable(
            relationId, tableType, distributedTableParams, distributionColumn);
    }

    EnsureRelationCanBeDistributed(relationId, distributionColumn,
                                   citusTableParams.distributionMethod, colocationId,
                                   citusTableParams.replicationModel);

    /*
     * Make sure that existing reference tables have been replicated to all the nodes
     * such that we can create foreign keys and joins work immediately after creation.
     *
     * This will take a lock on the nodes to make sure no nodes are added after we have
     * verified and ensured the reference tables are copied everywhere.
     * Although copying reference tables here for anything but creating a new colocation
     * group, it requires significant refactoring which we don't want to perform now.
     */
    EnsureReferenceTablesExistOnAllNodes();

    /*
     * While adding tables to a colocation group we need to make sure no concurrent
     * mutations happen on the colocation group with regards to its placements. It is
     * important that we have already copied any reference tables before acquiring this
     * lock as these are competing operations.
     */
    LockColocationId(colocationId, ShareLock);

    /* we need to calculate these variables before creating distributed metadata */
    bool localTableEmpty = TableEmpty(relationId);
    Oid colocatedTableId = ColocatedTableId(colocationId);

    /* setting to false since this flag is only valid for citus local tables */
    bool autoConverted = false;

    /* create an entry for distributed table in pg_dist_partition */
    InsertIntoPgDistPartition(relationId, citusTableParams.distributionMethod,
                              distributionColumn, colocationId,
                              citusTableParams.replicationModel, autoConverted);

#if PG_VERSION_NUM >= PG_VERSION_16

    /*
     * PG16+ supports truncate triggers on foreign tables
     */
    if (RegularTable(relationId) || IsForeignTable(relationId))
#else

    /* foreign tables do not support TRUNCATE trigger */
    if (RegularTable(relationId))
#endif
    {
        CreateTruncateTrigger(relationId);
    }

    if (tableType == HASH_DISTRIBUTED) {
        /* create shards for hash distributed table */
        CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
                                         colocatedTableId, localTableEmpty);
    } else if (tableType == REFERENCE_TABLE) {
        /* create shards for reference table */
        CreateReferenceTableShard(relationId);
    } else if (tableType == SINGLE_SHARD_DISTRIBUTED) {
        /* create the shard of given single-shard distributed table */
        CreateSingleShardTableShard(relationId, colocatedTableId, colocationId);
    }

    if (ShouldSyncTableMetadata(relationId)) {
        SyncCitusTableMetadata(relationId);
    }

    /*
     * We've a custom way of foreign key graph invalidation,
     * see InvalidateForeignKeyGraph().
     */
    if (TableReferenced(relationId) || TableReferencing(relationId)) {
        InvalidateForeignKeyGraph();
    }

    /* copy over data for hash distributed and reference tables */
    if (tableType == HASH_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED ||
        tableType == REFERENCE_TABLE) {
        if (RegularTable(relationId)) {
            CopyLocalDataIntoShards(relationId);
        }
    }

    /*
     * Now recreate foreign keys that we dropped beforehand. As modifications are not
     * allowed on the relations that are involved in the foreign key relationship,
     * we can skip the validation of the foreign keys.
     */
    bool skip_validation = true;
    ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
                                       skip_validation);
}

/*
 * ConvertCitusLocalTableToTableType converts given Citus local table to
 * given table type.
 *
 * This only supports converting Citus local tables to reference tables
 * (by replicating the shard to workers) and single-shard distributed
 * tables (by replicating the shard to the appropriate worker and dropping
 * the local one).
 */
static void ConvertCitusLocalTableToTableType(
    Oid relationId, CitusTableType tableType,
    DistributedTableParams* distributedTableParams)
{
    if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        ereport(ERROR, (errmsg("table is not a local table added to metadata")));
    }

    if (tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED) {
        ereport(ERROR, (errmsg("table type is not supported for conversion")));
    }

    if ((tableType == SINGLE_SHARD_DISTRIBUTED) != (distributedTableParams != NULL)) {
        ereport(ERROR, (errmsg("distributed table params must be provided "
                               "when creating a distributed table and must "
                               "not be otherwise")));
    }

    EnsureCitusTableCanBeCreated(relationId);

    Relation relation = try_relation_open(relationId, ExclusiveLock);
    if (relation == NULL) {
        ereport(ERROR, (errmsg("could not create Citus table: "
                               "relation does not exist")));
    }

    relation_close(relation, NoLock);

    if (tableType == SINGLE_SHARD_DISTRIBUTED &&
        Session_ctx::Vars().ShardReplicationFactor > 1) {
        ereport(ERROR, (errmsg("could not create single shard table: "
                               "spq.shard_replication_factor is greater than 1"),
                        errhint("Consider setting spq.shard_replication_factor to 1 "
                                "and try again")));
    }

    LockRelationOid(relationId, ExclusiveLock);

    Var* distributionColumn = NULL;
    CitusTableParams citusTableParams =
        DecideCitusTableParams(tableType, distributedTableParams);

    uint32 colocationId = INVALID_COLOCATION_ID;
    if (distributedTableParams &&
        distributedTableParams->colocationParam.colocationParamType ==
            COLOCATE_WITH_COLOCATION_ID) {
        colocationId = distributedTableParams->colocationParam.colocationId;
    } else {
        colocationId = ColocationIdForNewTable(
            relationId, tableType, distributedTableParams, distributionColumn);
    }

    /* check constraints etc. on table based on new distribution params */
    EnsureRelationCanBeDistributed(relationId, distributionColumn,
                                   citusTableParams.distributionMethod, colocationId,
                                   citusTableParams.replicationModel);

    /*
     * Regarding the foreign key relationships that given relation is involved,
     * EnsureRelationCanBeDistributed() only checks the ones where the relation
     * is the referencing table. And given that the table at hand is a Citus
     * local table, right now it may only be referenced by a reference table
     * or a Citus local table. However, given that neither of those two cases
     * are not applicable for a distributed table, here we throw an error if
     * that's the case.
     *
     * Note that we don't need to check the same if we're creating a reference
     * table from a Citus local table because all the foreign keys referencing
     * Citus local tables are supported by reference tables.
     */
    if (tableType == SINGLE_SHARD_DISTRIBUTED) {
        EnsureNoFKeyFromTableType(relationId,
                                  INCLUDE_CITUS_LOCAL_TABLES | INCLUDE_REFERENCE_TABLES);
    }

    EnsureReferenceTablesExistOnAllNodes();

    LockColocationId(colocationId, ShareLock);

    /*
     * When converting to a single shard table, we want to drop the placement
     * on the coordinator, but only if transferring to a different node. In that
     * case, shouldDropLocalPlacement is true. When converting to a reference
     * table, we always keep the placement on the coordinator, so for reference
     * tables shouldDropLocalPlacement is always false.
     */
    bool shouldDropLocalPlacement = false;

    List* targetNodeList = NIL;
    if (tableType == SINGLE_SHARD_DISTRIBUTED) {
        uint32 targetNodeId = SingleShardTableColocationNodeId(colocationId);
        if (targetNodeId != CoordinatorNodeIfAddedAsWorkerOrError()->nodeId) {
            bool missingOk = false;
            WorkerNode* targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
            targetNodeList = list_make1(targetNode);

            shouldDropLocalPlacement = true;
        }
    } else if (tableType == REFERENCE_TABLE) {
        targetNodeList = ActivePrimaryNonCoordinatorNodeList(ShareLock);
        targetNodeList = SortList(targetNodeList, CompareWorkerNodes);
    }

    bool autoConverted = false;
    UpdateNoneDistTableMetadataGlobally(relationId, citusTableParams.replicationModel,
                                        colocationId, autoConverted);

    /* create the shard placement on workers and insert into pg_dist_placement globally */
    if (list_length(targetNodeList) > 0) {
        NoneDistTableReplicateCoordinatorPlacement(relationId, targetNodeList);
    }

    if (shouldDropLocalPlacement) {
        /*
         * We don't yet drop the local placement before handling partitions.
         * Otherewise, local shard placements of the partitions will be gone
         * before we create them on workers.
         *
         * However, we need to delete the related entry from pg_dist_placement
         * before distributing partitions (if any) because we need a sane metadata
         * state before doing so.
         */
        NoneDistTableDeleteCoordinatorPlacement(relationId);
    }

    /* if this table is partitioned table, distribute its partitions too */
    if (PartitionedTable(relationId)) {
        ereport(ERROR, (errmsg("distributed partition table is not supported now. ")));
    }

    if (shouldDropLocalPlacement) {
        NoneDistTableDropCoordinatorPlacementTable(relationId);
    }
}

/*
 * DecideCitusTableParams decides CitusTableParams based on given CitusTableType
 * and DistributedTableParams if it's a distributed table.
 *
 * DistributedTableParams should be non-null only if CitusTableType corresponds
 * to a distributed table.
 */
static CitusTableParams DecideCitusTableParams(
    CitusTableType tableType, DistributedTableParams* distributedTableParams)
{
    CitusTableParams citusTableParams = {0};
    switch (tableType) {
        case HASH_DISTRIBUTED: {
            Assert(distributedTableParams->colocationParam.colocationParamType ==
                   COLOCATE_WITH_TABLE_LIKE_OPT);

            citusTableParams.distributionMethod = DISTRIBUTE_BY_HASH;
            citusTableParams.replicationModel = DecideDistTableReplicationModel(
                DISTRIBUTE_BY_HASH,
                distributedTableParams->colocationParam.colocateWithTableName);
            break;
        }

        case APPEND_DISTRIBUTED: {
            Assert(distributedTableParams->colocationParam.colocationParamType ==
                   COLOCATE_WITH_TABLE_LIKE_OPT);

            citusTableParams.distributionMethod = DISTRIBUTE_BY_APPEND;
            citusTableParams.replicationModel = DecideDistTableReplicationModel(
                APPEND_DISTRIBUTED,
                distributedTableParams->colocationParam.colocateWithTableName);
            break;
        }

        case RANGE_DISTRIBUTED: {
            Assert(distributedTableParams->colocationParam.colocationParamType ==
                   COLOCATE_WITH_TABLE_LIKE_OPT);

            citusTableParams.distributionMethod = DISTRIBUTE_BY_RANGE;
            citusTableParams.replicationModel = DecideDistTableReplicationModel(
                RANGE_DISTRIBUTED,
                distributedTableParams->colocationParam.colocateWithTableName);
            break;
        }

        case SINGLE_SHARD_DISTRIBUTED: {
            citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
            citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
            break;
        }

        case REFERENCE_TABLE: {
            citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
            citusTableParams.replicationModel = REPLICATION_MODEL_2PC;
            break;
        }

        default: {
            ereport(ERROR, (errmsg("unexpected table type when deciding Citus "
                                   "table params")));
            break;
        }
    }

    return citusTableParams;
}

/*
 * PropagatePrerequisiteObjectsForDistributedTable ensures we can create shards
 * on all nodes by ensuring all dependent objects exist on all node.
 */
static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
{
    /*
     * Ensure that the sequences used in column defaults of the table
     * have proper types
     */
    EnsureRelationHasCompatibleSequenceTypes(relationId);

    /*
     * distributed tables might have dependencies on different objects, since we create
     * shards for a distributed table via multiple sessions these objects will be created
     * via their own connection and committed immediately so they become visible to all
     * sessions creating shards.
     */
    ObjectAddress* tableAddress =
        static_cast<ObjectAddress*>(palloc0(sizeof(ObjectAddress)));
    ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
    EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
    TrackPropagatedTableAndSequences(relationId);
}

/*
 * EnsureSequenceTypeSupported ensures that the type of the column that uses
 * a sequence on its DEFAULT is consistent with previous uses (if any) of the
 * sequence in distributed tables.
 * If any other distributed table uses the input sequence, it checks whether
 * the types of the columns using the sequence match. If they don't, it errors out.
 * Otherwise, the condition is ensured.
 * Since the owner of the sequence may not distributed yet, it should be added
 * explicitly.
 */
void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId)
{
    Oid attrDefOid;
    List* attrDefOids = GetAttrDefsFromSequence(seqOid);
#ifdef DISABLE_OG_COMMENTS
    /** openGauss don't have attrDefOid. */
    foreach_declared_oid(attrDefOid, attrDefOids)
    {
        ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);

        /*
         * If another distributed table is using the same sequence
         * in one of its column defaults, make sure the types of the
         * columns match.
         *
         * We skip non-distributed tables, but we need to check the current
         * table as it might reference the same sequence multiple times.
         */
        if (columnAddress.objectId != ownerRelationId &&
            !IsCitusTable(columnAddress.objectId)) {
            continue;
        }
        Oid currentAttributeTypId =
            GetAttributeTypeOid(columnAddress.objectId, columnAddress.objectSubId);
        if (attributeTypeId != currentAttributeTypId) {
            char* sequenceName = generate_qualified_relation_name(seqOid);
            char* citusTableName =
                generate_qualified_relation_name(columnAddress.objectId);
            ereport(ERROR,
                    (errmsg("The sequence %s is already used for a different"
                            " type in column %d of the table %s",
                            sequenceName, columnAddress.objectSubId, citusTableName)));
        }
    }
#endif
}

/*
 * AlterSequenceType alters the given sequence's type to the given type.
 */
void AlterSequenceType(Oid seqOid, Oid typeOid)
{
#ifdef DISABLE_OG_COMMENTS
    Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid);
    Oid currentSequenceTypeOid = sequenceData->seqtypid;
    if (currentSequenceTypeOid != typeOid) {
        AlterSeqStmt* alterSequenceStatement = makeNode(AlterSeqStmt);
        char* seqNamespace = get_namespace_name(get_rel_namespace(seqOid));
        char* seqName = get_rel_name(seqOid);
        alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
        Node* asTypeNode = (Node*)makeTypeNameFromOid(typeOid, -1);
        SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
        ParseState* pstate = make_parsestate(NULL);
        AlterSequence(pstate, alterSequenceStatement);
        CommandCounterIncrement();
    }
#endif
}

/*
 * EnsureRelationHasCompatibleSequenceTypes ensures that sequences used for columns
 * of the table have compatible types both with the column type on that table and
 * all other distributed tables' columns they have used for
 */
void EnsureRelationHasCompatibleSequenceTypes(Oid relationId)
{
    List* seqInfoList = NIL;

    GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
    EnsureDistributedSequencesHaveOneType(relationId, seqInfoList);
}

/*
 * EnsureDistributedSequencesHaveOneType first ensures that the type of the column
 * in which the sequence is used as default is supported for each sequence in input
 * dependentSequenceList, and then alters the sequence type if not the same with the
 * column type.
 */
static void EnsureDistributedSequencesHaveOneType(Oid relationId, List* seqInfoList)
{
    SequenceInfo* seqInfo = NULL;
    foreach_declared_ptr(seqInfo, seqInfoList)
    {
        if (!seqInfo->isNextValDefault) {
            /*
             * If a sequence is not on the nextval, we don't need any check.
             * This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col
             */
            continue;
        }

        /*
         * We should make sure that the type of the column that uses
         * that sequence is supported
         */
        Oid sequenceOid = seqInfo->sequenceOid;
        AttrNumber attnum = seqInfo->attributeNumber;
        Oid attributeTypeId = GetAttributeTypeOid(relationId, attnum);
        EnsureSequenceTypeSupported(sequenceOid, attributeTypeId, relationId);

        /*
         * Alter the sequence's data type in the coordinator if needed.
         *
         * First, we should only change the sequence type if the column
         * is a supported sequence type. For example, if a sequence is used
         * in an expression which then becomes a text, we should not try to
         * alter the sequence type to text. Postgres only supports int2, int4
         * and int8 as the sequence type.
         *
         * A sequence's type is bigint by default and it doesn't change even if
         * it's used in an int column. We should change the type if needed,
         * and not allow future ALTER SEQUENCE ... TYPE ... commands for
         * sequences used as defaults in distributed tables.
         */
        if (attributeTypeId == INT2OID || attributeTypeId == INT4OID ||
            attributeTypeId == INT8OID) {
            AlterSequenceType(sequenceOid, attributeTypeId);
        }
    }
}

/*
 * DecideDistTableReplicationModel function decides which replication model should be
 * used for a distributed table depending on given distribution configuration.
 */
static char DecideDistTableReplicationModel(char distributionMethod,
                                            char* colocateWithTableName)
{
    Assert(distributionMethod != DISTRIBUTE_BY_NONE);

    if (!IsColocateWithDefault(colocateWithTableName) &&
        !IsColocateWithNone(colocateWithTableName)) {
        text* colocateWithTableNameText = cstring_to_text(colocateWithTableName);
        Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
        CitusTableCacheEntry* targetTableEntry =
            GetCitusTableCacheEntry(colocatedRelationId);
        char replicationModel = targetTableEntry->replicationModel;

        return replicationModel;
    } else if (distributionMethod == DISTRIBUTE_BY_HASH &&
               !DistributedTableReplicationIsEnabled()) {
        return REPLICATION_MODEL_STREAMING;
    } else {
        return REPLICATION_MODEL_COORDINATOR;
    }

    /* we should not reach to this point */
    return REPLICATION_MODEL_INVALID;
}

/*
 * CreateHashDistributedTableShards creates shards of given hash distributed table.
 */
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
                                             Oid colocatedTableId, bool localTableEmpty)
{
    bool useExclusiveConnection = false;

    /*
     * Decide whether to use exclusive connections per placement or not. Note that
     * if the local table is not empty, we cannot use sequential mode since the COPY
     * operation that'd load the data into shards currently requires exclusive
     * connections.
     */
    if (RegularTable(relationId)) {
        useExclusiveConnection = CanUseExclusiveConnections(relationId, localTableEmpty);
    }

    if (colocatedTableId != InvalidOid) {
        /*
         * We currently allow concurrent distribution of colocated tables (which
         * we probably should not be allowing because of foreign keys /
         * partitioning etc).
         *
         * We also prevent concurrent shard moves / copy / splits) while creating
         * a colocated table.
         */
        AcquirePlacementColocationLock(colocatedTableId, ShareLock,
                                       "colocate distributed table");

        CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
    } else {
        /*
         * This path is only reached by create_distributed_table for the distributed
         * tables which will not be part of an existing colocation group. Therefore,
         * we can directly use ShardReplicationFactor global variable here.
         */
        CreateShardsWithRoundRobinPolicy(relationId, shardCount,
                                         Session_ctx::Vars().ShardReplicationFactor,
                                         useExclusiveConnection);
    }
}

/*
 * CreateSingleShardTableShard creates the shard of given single-shard
 * distributed table.
 */
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
                                        uint32 colocationId)
{
    if (colocatedTableId != InvalidOid) {
        /*
         * We currently allow concurrent distribution of colocated tables (which
         * we probably should not be allowing because of foreign keys /
         * partitioning etc).
         *
         * We also prevent concurrent shard moves / copy / splits) while creating
         * a colocated table.
         */
        AcquirePlacementColocationLock(colocatedTableId, ShareLock,
                                       "colocate distributed table");

        /*
         * We don't need to force using exclusive connections because we're anyway
         * creating a single shard.
         */
        bool useExclusiveConnection = false;
        CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
    } else {
        CreateSingleShardTableShardWithRoundRobinPolicy(relationId, colocationId);
    }
}

/*
 * ColocationIdForNewTable returns a colocation id for given table
 * according to given configuration. If there is no such configuration, it
 * creates one and returns colocation id of newly the created colocation group.
 * Note that DistributedTableParams and the distribution column Var should be
 * non-null only if CitusTableType corresponds to a distributed table.
 *
 * For append and range distributed tables, this function errors out if
 * colocateWithTableName parameter is not NULL, otherwise directly returns
 * INVALID_COLOCATION_ID.
 *
 * For reference tables, returns the common reference table colocation id.
 *
 * This function assumes its caller take necessary lock on relationId to
 * prevent possible changes on it.
 */
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
                                      DistributedTableParams* distributedTableParams,
                                      Var* distributionColumn)
{
    CitusTableParams citusTableParams =
        DecideCitusTableParams(tableType, distributedTableParams);

    uint32 colocationId = INVALID_COLOCATION_ID;

    if (tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED) {
        Assert(distributedTableParams->colocationParam.colocationParamType ==
               COLOCATE_WITH_TABLE_LIKE_OPT);
        char* colocateWithTableName =
            distributedTableParams->colocationParam.colocateWithTableName;
        if (!IsColocateWithDefault(colocateWithTableName)) {
            ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                            errmsg("cannot distribute relation"),
                            errdetail("Currently, colocate_with option is not supported "
                                      "for append / range distributed tables.")));
        }

        return colocationId;
    } else if (tableType == REFERENCE_TABLE) {
        return CreateReferenceTableColocationId();
    } else {
        /*
         * Get an exclusive lock on the colocation system catalog. Therefore, we
         * can be sure that there will no modifications on the colocation table
         * until this transaction is committed.
         */

        Oid distributionColumnType =
            distributionColumn ? distributionColumn->vartype : InvalidOid;
        Oid distributionColumnCollation =
            distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

        Assert(distributedTableParams->colocationParam.colocationParamType ==
               COLOCATE_WITH_TABLE_LIKE_OPT);
        char* colocateWithTableName =
            distributedTableParams->colocationParam.colocateWithTableName;

        /* get an advisory lock to serialize concurrent default group creations */
        if (IsColocateWithDefault(colocateWithTableName)) {
            AcquireColocationDefaultLock();
        }

        colocationId = FindColocateWithColocationId(
            relationId, citusTableParams.replicationModel, distributionColumnType,
            distributionColumnCollation, distributedTableParams->shardCount,
            distributedTableParams->shardCountIsStrict, colocateWithTableName);

        if (IsColocateWithDefault(colocateWithTableName) &&
            (colocationId != INVALID_COLOCATION_ID)) {
            /*
             * we can release advisory lock if there is already a default entry for given
             * params; else, we should keep it to prevent different default coloc entry
             * creation by concurrent operations.
             */
            ReleaseColocationDefaultLock();
        }

        if (colocationId == INVALID_COLOCATION_ID) {
            if (IsColocateWithDefault(colocateWithTableName)) {
                /*
                 * Generate a new colocation ID and insert a pg_dist_colocation
                 * record.
                 */
                colocationId = CreateColocationGroup(
                    distributedTableParams->shardCount,
                    Session_ctx::Vars().ShardReplicationFactor, distributionColumnType,
                    distributionColumnCollation);
            } else if (IsColocateWithNone(colocateWithTableName)) {
                /*
                 * Generate a new colocation ID and insert a pg_dist_colocation
                 * record.
                 */
                colocationId = CreateColocationGroup(
                    distributedTableParams->shardCount,
                    Session_ctx::Vars().ShardReplicationFactor, distributionColumnType,
                    distributionColumnCollation);
            }
        }
    }

    return colocationId;
}

/*
 * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given
 * relation with the given configuration. We perform almost all safety checks for
 * distributing table here. If there is an unsatisfied requirement, we error out
 * and do not distribute the table.
 *
 * This function assumes, callers have already acquired necessary locks to ensure
 * there will not be any change in the given relation.
 */
static void EnsureRelationCanBeDistributed(Oid relationId, Var* distributionColumn,
                                           char distributionMethod, uint32 colocationId,
                                           char replicationModel)
{
    EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod);

    /* user really wants triggers? */
    if (Session_ctx::Vars().EnableUnsafeTriggers) {
        ErrorIfRelationHasUnsupportedTrigger(relationId);
    } else {
        EnsureRelationHasNoTriggers(relationId);
    }

    /* we assume callers took necessary locks */
    Relation relation = relation_open(relationId, NoLock);
    TupleDesc relationDesc = RelationGetDescr(relation);
    char* relationName = RelationGetRelationName(relation);

    ErrorIfTableIsACatalogTable(relation);

    /* verify target relation is not distributed by a generated stored column
     */
    if (distributionMethod != DISTRIBUTE_BY_NONE &&
        DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn)) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("cannot distribute relation: %s", relationName),
                        errdetail("Distribution column must not use GENERATED ALWAYS "
                                  "AS (...) STORED.")));
    }

    /* verify target relation is not distributed by a column of type numeric with negative
     * scale */
    if (distributionMethod != DISTRIBUTE_BY_NONE &&
        DistributionColumnUsesNumericColumnNegativeScale(relationDesc,
                                                         distributionColumn)) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("cannot distribute relation: %s", relationName),
                        errdetail("Distribution column must not use numeric type "
                                  "with negative scale")));
    }

    /* check for support function needed by specified partition method */
    if (distributionMethod == DISTRIBUTE_BY_HASH) {
        Oid hashSupportFunction =
            SupportFunctionForColumn(distributionColumn, HASH_AM_OID, HASHPROC);
        if (hashSupportFunction == InvalidOid) {
            ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
                            errmsg("could not identify a hash function for type %s",
                                   format_type_be(distributionColumn->vartype)),
#ifdef DISABLE_OG_COMMENTS
                            errdatatype(distributionColumn->vartype),
#endif
                            errdetail("Partition column types must have a hash function "
                                      "defined to use hash partitioning.")));
        }
    } else if (distributionMethod == DISTRIBUTE_BY_RANGE) {
        Oid btreeSupportFunction =
            SupportFunctionForColumn(distributionColumn, BTREE_AM_OID, BTORDER_PROC);
        if (btreeSupportFunction == InvalidOid) {
            ereport(ERROR,
                    (errcode(ERRCODE_UNDEFINED_FUNCTION),
                     errmsg("could not identify a comparison function for type %s",
                            format_type_be(distributionColumn->vartype)),
#ifdef DISABLE_OG_COMMENTS
                     errdatatype(distributionColumn->vartype),
#endif
                     errdetail("Partition column types must have a comparison function "
                               "defined to use range partitioning.")));
        }
    }

    /* partitions cannot be distributed if their parent is not distributed */
    if (PartitionedTableNoLock(relationId)) {
        ereport(ERROR,
                (errmsg("Spq does not support distributed partition relation \"%s\" ",
                        relationName)));
    }

    ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel,
                                 distributionColumn, colocationId);

    ErrorIfUnsupportedPolicy(relation);
    relation_close(relation, NoLock);
}

/*
 * ErrorIfTemporaryTable errors out if the given table is a temporary table.
 */
static void ErrorIfTemporaryTable(Oid relationId)
{
    if (get_rel_persistence(relationId) == RELPERSISTENCE_TEMP) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("cannot distribute a temporary table")));
    }
}

/*
 * ErrorIfTableIsACatalogTable is a helper function to error out for citus
 * table creation from a catalog table.
 */
void ErrorIfTableIsACatalogTable(Relation relation)
{
    if (relation->rd_rel->relnamespace != PG_CATALOG_NAMESPACE) {
        return;
    }

    ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                    errmsg("cannot create a citus table from a catalog table")));
}

/*
 * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
 * according to ShouldLocalTableBeEmpty but it is not.
 */
static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod)
{
    if (ShouldLocalTableBeEmpty(relationId, distributionMethod)) {
        EnsureLocalTableEmpty(relationId);
    }
}

/*
 * ShouldLocalTableBeEmpty returns true if the local table should be empty
 * before creating a citus table.
 * In some cases, it is possible and safe to send local data to shards while
 * distributing the table. In those cases, we can distribute non-empty local
 * tables. This function checks the distributionMethod and relation kind to
 * see whether we need to be ensure emptiness of local table.
 */
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod)
{
    bool shouldLocalTableBeEmpty = false;
    if (distributionMethod != DISTRIBUTE_BY_HASH &&
        distributionMethod != DISTRIBUTE_BY_NONE) {
        /*
         * We only support hash distributed tables and reference tables
         * for initial data loading
         */
        shouldLocalTableBeEmpty = true;
    } else if (!RegularTable(relationId)) {
        /*
         * We only support tables and partitioned tables for initial
         * data loading
         */
        shouldLocalTableBeEmpty = true;
    }

    return shouldLocalTableBeEmpty;
}

/*
 * EnsureLocalTableEmpty errors out if the local table is not empty.
 */
static void EnsureLocalTableEmpty(Oid relationId)
{
    char* relationName = get_rel_name(relationId);
    bool localTableEmpty = TableEmpty(relationId);

    if (!localTableEmpty) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                        errmsg("cannot distribute relation \"%s\"", relationName),
                        errdetail("Relation \"%s\" contains data.", relationName),
                        errhint("Empty your table before distributing it.")));
    }
}

/*
 * EnsureDistributableTable ensures the given table type is appropriate to
 * be distributed. Table type should be regular or citus local table.
 */
static void EnsureDistributableTable(Oid relationId)
{
    bool isLocalTable = IsCitusTableType(relationId, CITUS_LOCAL_TABLE);
    bool isRegularTable = !IsCitusTableType(relationId, ANY_CITUS_TABLE_TYPE);

    if (!isLocalTable && !isRegularTable) {
        char* relationName = get_rel_name(relationId);

        ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                        errmsg("table \"%s\" is already distributed", relationName)));
    }
}

/*
 * EnsureTableNotDistributed errors out if the table is distributed.
 */
void EnsureTableNotDistributed(Oid relationId)
{
    char* relationName = get_rel_name(relationId);

    bool isCitusTable = IsCitusTable(relationId);

    if (isCitusTable) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                        errmsg("table \"%s\" is already distributed", relationName)));
    }
}

/*
 * EnsureRelationHasNoTriggers errors out if the given table has triggers on
 * it. See also GetExplicitTriggerIdList function's comment for the triggers this
 * function errors out.
 */
static void EnsureRelationHasNoTriggers(Oid relationId)
{
    List* explicitTriggerIds = GetExplicitTriggerIdList(relationId);

    if (list_length(explicitTriggerIds) > 0) {
        char* relationName = get_rel_name(relationId);

        Assert(relationName != NULL);
        ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it "
                               "has triggers",
                               relationName),
                        errhint("Consider dropping all the triggers on \"%s\" "
                                "and retry.",
                                relationName)));
    }
}

/*
 * LookupDistributionMethod maps the oids of spq.distribution_type enum
 * values to pg_dist_partition.partmethod values.
 *
 * The passed in oid has to belong to a value of spq.distribution_type.
 */
char LookupDistributionMethod(Oid distributionMethodOid)
{
    char distributionMethod = 0;

    HeapTuple enumTuple =
        SearchSysCache1(ENUMOID, ObjectIdGetDatum(distributionMethodOid));
    if (!HeapTupleIsValid(enumTuple)) {
        ereport(ERROR,
                (errmsg("invalid internal value for enum: %u", distributionMethodOid)));
    }

    Form_pg_enum enumForm = (Form_pg_enum)GETSTRUCT(enumTuple);
    const char* enumLabel = NameStr(enumForm->enumlabel);

    if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0) {
        distributionMethod = DISTRIBUTE_BY_HASH;
#ifdef DISABLE_OG_COMMENTS
    } else if (strncmp(enumLabel, "append", NAMEDATALEN) == 0) {
        distributionMethod = DISTRIBUTE_BY_APPEND;
    } else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0) {
        distributionMethod = DISTRIBUTE_BY_RANGE;
#endif
    } else {
        ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
    }

    ReleaseSysCache(enumTuple);

    return distributionMethod;
}

/*
 *	SupportFunctionForColumn locates a support function given a column, an access method,
 *	and and id of a support function. This function returns InvalidOid if there is no
 *	support function for the operator class family of the column, but if the data type
 *	of the column has no default operator class whatsoever, this function errors out.
 */
static Oid SupportFunctionForColumn(Var* partitionColumn, Oid accessMethodId,
                                    int16 supportFunctionNumber)
{
    Oid columnOid = partitionColumn->vartype;
    Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId);

    /* currently only support using the default operator class */
    if (operatorClassId == InvalidOid) {
        ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
                        errmsg("data type %s has no default operator class for specified"
                               " partition method",
                               format_type_be(columnOid)),
#ifdef DISABLE_OG_COMMENTS
                        errdatatype(columnOid),
#endif
                        errdetail("Partition column types must have a default operator"
                                  " class defined.")));
    }

    Oid operatorFamilyId = get_opclass_family(operatorClassId);
    Oid operatorClassInputType = get_opclass_input_type(operatorClassId);
    Oid supportFunctionOid =
        get_opfamily_proc(operatorFamilyId, operatorClassInputType,
                          operatorClassInputType, supportFunctionNumber);

    return supportFunctionOid;
}

/*
 * TableEmpty function checks whether given table contains any row and
 * returns false if there is any data.
 */
bool TableEmpty(Oid tableId)
{
    Oid schemaId = get_rel_namespace(tableId);
    char* schemaName = get_namespace_name(schemaId);
    char* tableName = get_rel_name(tableId);
    char* tableQualifiedName = quote_qualified_identifier(schemaName, tableName);

    StringInfo selectTrueQueryString = makeStringInfo();

    bool readOnly = true;

    int spiConnectionResult = SPI_connect();
    if (spiConnectionResult != SPI_OK_CONNECT) {
        ereport(ERROR, (errmsg("could not connect to SPI manager")));
    }

    appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName);

    int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0);
    if (spiQueryResult != SPI_OK_SELECT) {
        ereport(ERROR, (errmsg("execution was not successful \"%s\"",
                               selectTrueQueryString->data)));
    }

    /* we expect that SELECT TRUE query will return single value in a single row OR empty
     * set */
    Assert(SPI_processed == 1 || SPI_processed == 0);

    bool localTableEmpty = !SPI_processed;

    SPI_finish();

    return localTableEmpty;
}

/*
 * CanUseExclusiveConnections checks if we can open parallel connections
 * while creating shards. We simply error out if we need to execute
 * sequentially but there is data in the table, since we cannot copy the
 * data to shards sequentially.
 */
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
{
    bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId);
    bool shouldRunSequential =
        Session_ctx::Vars().MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
        hasForeignKeyToReferenceTable;

    if (shouldRunSequential && ParallelQueryExecutedInTransaction()) {
        /*
         * We decided to use sequential execution. It's either because relation
         * has a pre-existing foreign key to a reference table or because we
         * decided to use sequential execution due to a query executed in the
         * current xact beforehand.
         * We have specific error messages for either cases.
         */

        char* relationName = get_rel_name(relationId);

        if (hasForeignKeyToReferenceTable) {
            /*
             * If there has already been a parallel query executed, the sequential mode
             * would still use the already opened parallel connections to the workers,
             * thus contradicting our purpose of using sequential mode.
             */
            ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
                                   "transaction because it has a foreign key to "
                                   "a reference table",
                                   relationName),
                            errdetail("If a hash distributed table has a foreign key "
                                      "to a reference table, it has to be created "
                                      "in sequential mode before any parallel commands "
                                      "have been executed in the same transaction"),
                            errhint("Try re-running the transaction with "
                                    "\"SET LOCAL spq.multi_shard_modify_mode TO "
                                    "\'sequential\';\"")));
        } else if (Session_ctx::Vars().MultiShardConnectionType ==
                   SEQUENTIAL_CONNECTION) {
            ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because "
                                   "a parallel query was executed in this transaction",
                                   relationName),
                            errhint("If you have manually set "
                                    "spq.multi_shard_modify_mode to 'sequential', "
                                    "try with 'parallel' option. ")));
        }
    } else if (shouldRunSequential) {
        return false;
    } else if (!localTableEmpty || IsMultiStatementTransaction()) {
        return true;
    }

    return false;
}

/*
 * CreateTruncateTrigger creates a truncate trigger on table identified by relationId
 * and assigns spq_truncate_trigger() as handler.
 */
void CreateTruncateTrigger(Oid relationId)
{
    StringInfo triggerName = makeStringInfo();
    bool internal = true;

    appendStringInfo(triggerName, "truncate_trigger");

    CreateTrigStmt* trigger = makeNode(CreateTrigStmt);
    trigger->trigname = triggerName->data;
    trigger->relation = NULL;
    trigger->funcname = SystemFuncName(CITUS_TRUNCATE_TRIGGER_NAME);
    trigger->args = NIL;
    trigger->row = false;
    trigger->timing = TRIGGER_TYPE_AFTER;
    trigger->events = TRIGGER_TYPE_TRUNCATE;
    trigger->columns = NIL;
    trigger->whenClause = NULL;
    trigger->isconstraint = false;

    CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, false);
}

/*
 * RegularTable function returns true if given table's relation kind is RELKIND_RELATION
 * or RELKIND_PARTITIONED_TABLE otherwise it returns false.
 */
bool RegularTable(Oid relationId)
{
    char relationKind = get_rel_relkind(relationId);

    if (relationKind == RELKIND_RELATION ||
        relationKind == PARTTYPE_PARTITIONED_RELATION) {
        return true;
    }

    return false;
}

/*
 * CopyLocalDataIntoShards is a wrapper around CopyFromLocalTableIntoDistTable
 * to copy data from the local table, which is hidden after converting it to a
 * distributed table, into the shards of the distributed table.
 *
 * After copying local data into the distributed table, the local data remains
 * in place and should be truncated at a later time.
 */
static void CopyLocalDataIntoShards(Oid distributedTableId)
{
    uint64 rowsCopied =
        CopyFromLocalTableIntoDistTable(distributedTableId, distributedTableId);
    if (rowsCopied > 0) {
        char* qualifiedRelationName =
            generate_qualified_relation_name(distributedTableId);
        ereport(NOTICE, (errmsg("copying the data has completed"),
                         errdetail("The local data in the table is no longer visible, "
                                   "but is still on disk."),
                         errhint("To remove the local data, run: SELECT "
                                 "truncate_local_data_after_distributing_table($$%s$$)",
                                 qualifiedRelationName)));
    }
}

/*
 * CopyFromLocalTableIntoDistTable copies data from given local table into
 * the shards of given distributed table.
 *
 * For partitioned tables, this functions returns without copying the data
 * because we call this function for both partitioned tables and its partitions.
 * Returning early saves us from copying data to workers twice.
 *
 * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
 * We cannot use a regular COPY here since that cannot read from a table. Instead
 * we read from the table and pass each tuple to the CitusCopyDestReceiver which
 * opens a connection and starts a COPY for each shard placement that will have
 * data.
 *
 * We assume that the local table might indeed be a distributed table and the
 * caller would want to read the local data from the shell table in that case.
 * For this reason, to keep it simple, we perform a heap scan directly on the
 * table instead of using SELECT.
 *
 * We read from the table and pass each tuple to the CitusCopyDestReceiver which
 * opens a connection and starts a COPY for each shard placement that will have
 * data.
 */
uint64 CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId)
{
    /* take an ExclusiveLock to block all operations except SELECT */
    Relation localRelation = table_open(localTableId, ExclusiveLock);

    /*
     * Skip copying from partitioned tables, we will copy the data from
     * partition to partition's shards.
     */
    if (PartitionedTable(distributedTableId)) {
        table_close(localRelation, NoLock);

        return 0;
    }

    /*
     * All writes have finished, make sure that we can see them by using the
     * latest snapshot. We use GetLatestSnapshot instead of
     * GetTransactionSnapshot since the latter would not reveal all writes
     * in serializable or repeatable read mode. Note that subsequent reads
     * from the distributed table would reveal those writes, temporarily
     * violating the isolation level. However, this seems preferable over
     * dropping the writes entirely.
     */
    PushActiveSnapshot(GetLatestSnapshot());

    Relation distributedRelation = RelationIdGetRelation(distributedTableId);

    /* get the table columns for distributed table */
    TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
    List* columnNameList = TupleDescColumnNameList(destTupleDescriptor);

    RelationClose(distributedRelation);

    int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;

    /* determine the partition column in the tuple descriptor */
    Var* partitionColumn = PartitionColumn(distributedTableId, 0);
    if (partitionColumn != NULL) {
        partitionColumnIndex = partitionColumn->varattno - 1;
    }

    /* create tuple slot for local relation */
    TupleDesc sourceTupleDescriptor = RelationGetDescr(localRelation);
    TupleTableSlot* slot = MakeSingleTupleTableSlot(sourceTupleDescriptor, false,
                                                    sourceTupleDescriptor->td_tam_ops);

    /* initialise per-tuple memory context */
    EState* estate = CreateExecutorState();
    ExprContext* econtext = GetPerTupleExprContext(estate);
    econtext->ecxt_scantuple = slot;
    const bool nonPublishableData = false;
    DestReceiver* copyDest = (DestReceiver*)CreateCitusCopyDestReceiver(
        distributedTableId, columnNameList, partitionColumnIndex, estate, NULL,
        nonPublishableData);

    /* initialise state for writing to shards, we'll open connections on demand */
    copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);

    uint64 rowsCopied =
        DoCopyFromLocalTableIntoShards(localRelation, copyDest, slot, estate);

    /* finish writing into the shards */
    copyDest->rShutdown(copyDest);
    copyDest->rDestroy(copyDest);

    /* free memory and close the relation */
    ExecDropSingleTupleTableSlot(slot);
    FreeExecutorState(estate);
    table_close(localRelation, NoLock);

    PopActiveSnapshot();

    return rowsCopied;
}

/*
 * DoCopyFromLocalTableIntoShards performs a copy operation
 * from local tables into shards.
 *
 * Returns the number of rows copied.
 */
static uint64 DoCopyFromLocalTableIntoShards(Relation localRelation,
                                             DestReceiver* copyDest, TupleTableSlot* slot,
                                             EState* estate)
{
    /* begin reading from local table */
    TableScanDesc scan = tableam_scan_begin(localRelation, GetActiveSnapshot(), 0, NULL);

    MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));

    uint64 rowsCopied = 0;
    while (tableam_scan_getnextslot(scan, ForwardScanDirection, slot)) {
        /* send tuple it to a shard */
        copyDest->receiveSlot(slot, copyDest);

        /* clear tuple memory */
        ResetPerTupleExprContext(estate);

        /* make sure we roll back on cancellation */
        CHECK_FOR_INTERRUPTS();

        if (rowsCopied == 0) {
            ereport(NOTICE, (errmsg("Copying data from local table...")));
        }

        rowsCopied++;

        if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0) {
            ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
        }
    }

    if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0) {
        ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
    }

    MemoryContextSwitchTo(oldContext);

    /* finish reading from the local table */
    tableam_scan_end(scan);

    return rowsCopied;
}

/*
 * TupleDescColumnNameList returns a list of column names for the given tuple
 * descriptor as plain strings.
 */
static List* TupleDescColumnNameList(TupleDesc tupleDescriptor)
{
    List* columnNameList = NIL;

    for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) {
        Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
        char* columnName = NameStr(currentColumn->attname);

        if (currentColumn->attisdropped || IsGeneratedStoredColumn(currentColumn)) {
            continue;
        }

        columnNameList = lappend(columnNameList, columnName);
    }

    return columnNameList;
}

/*
 * is_valid_numeric_typmod checks if the typmod value is valid
 *
 * Because of the offset, valid numeric typmods are at least VARHDRSZ
 *
 * Copied from PG. See numeric.c for understanding how this works.
 */
static bool is_valid_numeric_typmod(int32 typmod)
{
    return typmod >= (int32)VARHDRSZ;
}

/*
 * numeric_typmod_scale extracts the scale from a numeric typmod.
 *
 * Copied from PG. See numeric.c for understanding how this works.
 *
 */
static int numeric_typmod_scale(int32 typmod)
{
    return (((typmod - VARHDRSZ) & 0x7ff) ^ 1024) - 1024;
}

/*
 * DistributionColumnUsesNumericColumnNegativeScale returns whether a given relation uses
 * numeric data type with negative scale on distribution column
 */
static bool DistributionColumnUsesNumericColumnNegativeScale(TupleDesc relationDesc,
                                                             Var* distributionColumn)
{
    Form_pg_attribute attributeForm =
        TupleDescAttr(relationDesc, distributionColumn->varattno - 1);

    if (attributeForm->atttypid == NUMERICOID &&
        is_valid_numeric_typmod(attributeForm->atttypmod) &&
        numeric_typmod_scale(attributeForm->atttypmod) < 0) {
        return true;
    }

    return false;
}

/*
 * DistributionColumnUsesGeneratedStoredColumn returns whether a given relation uses
 * GENERATED ALWAYS AS (...) STORED on distribution column
 */
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
                                                        Var* distributionColumn)
{
    Form_pg_attribute attributeForm =
        TupleDescAttr(relationDesc, distributionColumn->varattno - 1);
    if (IsGeneratedStoredColumn(attributeForm)) {
        return true;
    }
    return false;
}

/*
 * ErrorIfForeignTable errors out if the relation with given relationOid
 * is a foreign table.
 */
static void ErrorIfForeignTable(Oid relationOid)
{
    if (IsForeignTable(relationOid)) {
        char* relname = get_rel_name(relationOid);
        char* qualifiedRelname = generate_qualified_relation_name(relationOid);
        ereport(ERROR, (errmsg("foreign tables cannot be distributed"),
                        (errhint("Can add foreign table \"%s\" to metadata by running: "
                                 "SELECT citus_add_local_table_to_metadata($$%s$$);",
                                 relname, qualifiedRelname))));
    }
}

/*
 * ErrorIfUnloggedTable errors out if the relation with given relationOid
 * is a unlogged table.
 * @param[in]  relationId  relation id of the table to be distributed.
 */
static void ErrorIfUnloggedTable(Oid relationId)
{
    if (get_rel_persistence(relationId) == RELPERSISTENCE_UNLOGGED) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("cannot distribute a unlogged table")));
    }
}
